Skip to content

refactor(BA-5859): Stream container stats from Docker instead of polling#11224

Open
rapsealk wants to merge 6 commits into
mainfrom
perf/11219-stream-container-stats
Open

refactor(BA-5859): Stream container stats from Docker instead of polling#11224
rapsealk wants to merge 6 commits into
mainfrom
perf/11219-stream-container-stats

Conversation

@rapsealk
Copy link
Copy Markdown
Member

@rapsealk rapsealk commented Apr 22, 2026

Closes #11219 (BA-5859)
Refs #11216

Summary

  • DockerStatsStreamer: a long-lived reader keyed by container ID. Callers read the most recent sample from memory instead of container.stats(stream=False) per collection cycle.
  • Eager start() / stop() wired through container-start / container-clean events on the agent (no lazy-on-first-get race).
  • Bounded exponential-backoff reconnect (1s → 30s, 8 retries) on transient ClientConnectionError / TimeoutError.
  • CancelledError re-raised; non-transient DockerError (e.g. 404 for container-gone) exits cleanly without retry spin.
  • Renamed _CONTAINER_STAT_TIMEOUT_CONTAINER_INSPECT_TIMEOUT to reflect its remaining scope.

Why

Cuts API-path stat latency from 100-500 ms per container to a memory read, and removes the 2s per-container timeout as a serial-blocker. Part of epic #11216.

Known trade-offs (tracked as follow-ups)

Test plan

  • pants test tests/unit/agent:: passes (lifecycle, reconnect, shutdown, 404 cleanup)
  • Run an agent locally; confirm CPU/memory stats populate for running containers
  • Stop a container mid-stream; confirm the reader task exits cleanly and get_latest() returns None
  • docker restart mid-session; confirm reconnect kicks in and stats resume

Callers of the Docker API stats path (CPUPlugin and MemoryPlugin in
DOCKER mode) now read the most recent sample from an in-memory cache
kept up-to-date by a long-lived `container.stats(stream=True)` reader
per container, instead of issuing a fresh `stats(stream=False)` HTTP
round-trip every collection cycle. This cuts per-container stat latency
from ~100-500 ms to a memory read and removes the 2 s per-container
timeout as a serial-blocker.

Readers self-terminate when the upstream iterator ends or a connection
error occurs; a subsequent lookup respawns the reader if the container
reappears. Full close is tied to plugin cleanup.

Closes #11219

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Copilot AI review requested due to automatic review settings April 22, 2026 04:33
@rapsealk rapsealk added this to the 26.5 milestone Apr 22, 2026
@github-actions github-actions Bot added size:L 100~500 LoC comp:agent Related to Agent component labels Apr 22, 2026
@rapsealk rapsealk changed the title enhance(agent): stream container stats from Docker instead of polling refactor(agent): Stream container stats from Docker instead of polling Apr 22, 2026
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR improves Docker-mode container CPU/memory metric collection by switching from per-cycle container.stats(stream=False) polling to an in-process cache populated by long-lived stats(stream=True) readers, reducing per-container latency and avoiding serial blocking on timeouts.

Changes:

  • Add DockerStatsStreamer to maintain a per-container streamed stats reader and cache the latest sample in memory.
  • Update CPUPlugin and MemoryPlugin to read stats from the streamer cache instead of calling fetch_api_stats() each cycle.
  • Update unit tests to inject a mocked/prewarmed stats streamer into the intrinsic plugins.

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 5 comments.

File Description
src/ai/backend/agent/docker/intrinsic.py Introduces DockerStatsStreamer, wires it into CPU/Memory intrinsic plugins, and refactors the API stats path to read from cache.
tests/unit/agent/test_docker_intrinsic.py Adjusts fixtures to provide a mocked DockerStatsStreamer so plugin tests don’t depend on live Docker stats calls.
changes/11219.enhance.md Adds a changelog entry describing the streamed stats approach.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

e,
)
finally:
self._latest.pop(container_id, None)
Copy link

Copilot AI Apr 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DockerStatsStreamer._read_stream() removes _latest in finally, but it never removes the completed task from _tasks. If containers churn, _tasks will grow unbounded with done tasks (and possibly their exception tracebacks), which is effectively a memory leak. Consider popping container_id from _tasks in finally (or attaching a task.add_done_callback to self-clean), and/or calling stop() when the stream ends.

Suggested change
self._latest.pop(container_id, None)
self._latest.pop(container_id, None)
current_task = asyncio.current_task()
if current_task is not None and self._tasks.get(container_id) is current_task:
self._tasks.pop(container_id, None)

Copilot uses AI. Check for mistakes.
Comment on lines 293 to 299
async def init(self, context: Any | None = None) -> None:
self._docker = Docker()
self._stats_streamer = DockerStatsStreamer(self._docker)

async def cleanup(self) -> None:
await self._stats_streamer.close()
await self._docker.close()
Copy link

Copilot AI Apr 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CPUPlugin creates its own DockerStatsStreamer (and its own Docker client). Since MemoryPlugin does the same, this will open two long-lived stats(stream=True) connections per container (one per plugin), which can negate the intended resource savings and put unnecessary load on dockerd for hosts with many containers. Consider sharing a single streamer (and ideally a single Docker client) across intrinsic plugins, e.g., via an agent-level/shared context or a module-level singleton keyed by the underlying Docker client.

Copilot uses AI. Check for mistakes.
Comment on lines 618 to 624
async def init(self, context: Any | None = None) -> None:
self._docker = Docker()
self._stats_streamer = DockerStatsStreamer(self._docker)

async def cleanup(self) -> None:
await self._stats_streamer.close()
await self._docker.close()
Copy link

Copilot AI Apr 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MemoryPlugin also instantiates its own DockerStatsStreamer, which (together with CPUPlugin’s streamer) results in duplicate per-container stats streams. To avoid 2× open connections per container, consider sharing a single streamer across intrinsic plugins (and/or reusing a shared Docker client).

Copilot uses AI. Check for mistakes.
Comment on lines +140 to +174
@@ -149,7 +155,7 @@ async def fetch_api_stats(container: DockerContainer) -> dict[str, Any] | None:
)
return None
else:
entry = {"read": "0001-01-01"}
entry: dict[str, Any] = {"read": "0001-01-01"}
# aiodocker 0.16 or later returns a list of dict, even when not streaming.
match ret:
case list() if ret:
@@ -164,9 +170,102 @@ async def fetch_api_stats(container: DockerContainer) -> dict[str, Any] | None:
ret,
)
return None
if entry["read"].startswith("0001-01-01") or entry["preread"].startswith("0001-01-01"):
return _validate_stats_entry(entry)

Copy link

Copilot AI Apr 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fetch_api_stats() no longer appears to be used anywhere in the Docker agent path (call sites were replaced by DockerStatsStreamer.get_latest()), and repository-wide search shows no remaining references. Keeping this unused function increases maintenance surface and can confuse future changes; consider removing it (or clearly documenting it as a legacy fallback) and updating tests that still patch it.

Copilot uses AI. Check for mistakes.
Comment on lines +176 to +216
class DockerStatsStreamer:
"""
Maintains one long-lived `container.stats(stream=True)` reader per container and
exposes the most recent decoded sample from an in-memory cache.

Callers read the cached sample via :meth:`get_latest` instead of issuing a new
HTTP round-trip every collection cycle. The first call for a previously-unseen
container lazily spawns the reader; no sample is returned until dockerd has
emitted at least one frame.

Readers self-terminate when the upstream iterator ends (container removed) or
raises a connection error. A subsequent :meth:`get_latest` call starts a fresh
reader if the container reappears.

TODO(#11219): Wire start/stop into the agent's container-lifecycle hooks
(_handle_start_event / _handle_clean_event) to avoid relying on lazy startup.
"""

_docker: Docker
_latest: dict[str, dict[str, Any]]
_tasks: dict[str, asyncio.Task[None]]
_closed: bool

def __init__(self, docker: Docker) -> None:
self._docker = docker
self._latest = {}
self._tasks = {}
self._closed = False

def get_latest(self, container_id: str) -> dict[str, Any] | None:
"""Return the most recent cached sample for `container_id`, starting a
reader if none is running. Returns None until the first frame arrives."""
if self._closed:
return None
return entry
task = self._tasks.get(container_id)
if task is None or task.done():
self._tasks[container_id] = asyncio.create_task(
self._read_stream(container_id),
name=f"docker-stats-stream:{container_id[:7]}",
)
return self._latest.get(container_id)
Copy link

Copilot AI Apr 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DockerStatsStreamer introduces new lifecycle/caching behavior (lazy task spawn, stream termination handling, close() cancellation), but the unit tests only mock it rather than exercising its real behavior. Adding focused tests for get_latest()/close() (and for stream termination cleaning up internal state like _tasks) would help prevent regressions and catch resource-leak scenarios early.

Copilot uses AI. Check for mistakes.
rapsealk and others added 2 commits April 22, 2026 14:12
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…andling

Addresses review blockers on PR #11224:
- Eager start/stop of stream tasks from container create/destroy events
  instead of lazy-on-first-call, eliminating the cold-start cycle that
  returned None for CPU/memory on newly-created containers.
- Re-raise asyncio.CancelledError from the stream reader; log other
  exceptions with container id; stop swallowing BackendAIError silently.
- Reconnect with bounded exponential backoff on ClientConnectionError.
- Ensure close() cancels all in-flight stream tasks.
- Rename _CONTAINER_STAT_TIMEOUT to _CONTAINER_INSPECT_TIMEOUT to match
  its current usage.
- Add lifecycle/reconnect/shutdown tests.

Refs #11219
Refs #11224

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@github-actions github-actions Bot added size:XL 500~ LoC and removed size:L 100~500 LoC labels Apr 22, 2026
@rapsealk
Copy link
Copy Markdown
Member Author

Review summary from an independent pass + blocker resolutions pushed to this branch (commit 43e9ff7c0).

Verdict: ship with changes — blockers resolved.

Finding Severity Resolution
DockerStatsStreamer.stop() defined but never called — container destruction leaked the reader task until dockerd closed the stream Blocker Eager start() now fires from _handle_start_event; stop() from _handle_clean_event. No-op default hooks notify_container_started/destroyed added to AbstractComputePlugin so k8s/dummy intrinsics inherit them. TODO(#11219) removed
Lazy-on-first-call start meant the first stat cycle returned None for CPU/memory on newly-created containers (regression vs. stats(stream=False)) Blocker Eager start gives dockerd ~1s head-start before the first collection cycle. Opt-in wait_for_first_sample(cid, timeout=0.5) available when callers want a blocking guarantee
except (asyncio.CancelledError, Exception): pass swallowed cancellation and BackendAIError alike Major Reader re-raises CancelledError; logs others with cid; new ContainerStatsStreamError(BackendAIError) on permanent failure
No reconnect on ClientConnectionError Major Bounded exponential backoff (1s → 30s, 8 retries), then raises ContainerStatsStreamError
close() didn't cancel in-flight readers Minor Now awaits cancellation of all active tasks
_CONTAINER_STAT_TIMEOUT misnamed after refactor (only used for container.show() now) Nit Renamed to _CONTAINER_INSPECT_TIMEOUT

Tests added (9 new): test_start_spawns_reader_and_first_sample_lands, test_stop_cancels_reader_and_drops_cache, test_close_cancels_all_in_flight_tasks, test_first_sample_wait_times_out_when_no_frames, test_reconnect_after_client_connection_error, test_cancelled_error_reraises_from_reader, test_notify_started_calls_plugin_hook, test_notify_destroyed_calls_plugin_hook, test_notify_tolerates_plugin_exceptions.

Coordination note: if #11223 (sysfs-first) merges first, the Docker stream is only needed for network/IO fields — streaming all fields on every container is wasteful. Worth a follow-up to narrow the streamed fields, but out of scope here.

Deferred: CPUPlugin and MemoryPlugin each own a separate DockerStatsStreamer, so two streams open per container. Consolidating is a follow-up refactor.

@rapsealk rapsealk requested review from a team and achimnol April 22, 2026 06:17
- Drop DockerStatsStreamer.wait_for_first_sample and its backing
  first-sample events/constant/test. It was an opt-in helper with no
  production caller.
- Drop the orphaned fetch_api_stats + mock_fetch_api_stats fixture; both
  api_impl paths now go through self._stats_streamer.get_latest().
  Kubernetes keeps its own copy.
- Delete ContainerStatsStreamError: zero raise-sites, only instantiated
  to harvest an error_code for a log line. Replace with a plain
  log.error.

Refs #11219
Refs #11224

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- Add TODO(#11223) notes on CPU/Memory api_impl paths: after sysfs-first
  lands, the Docker stream on these plugins is only needed for network
  and blkio (which neither consumes), so the stream should migrate to
  a network/IO consumer.
- Add TODO(#11232) at the two per-plugin DockerStatsStreamer instantiation
  sites flagging the shared-streamer refactor.
- Add a test covering the DockerError 404 branch of _read_stream: reader
  exits cleanly on container-gone without spinning retries.

Refs #11219
Refs #11223
Refs #11224
Refs #11232

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@jopemachine
Copy link
Copy Markdown
Member

For cumulative counters, if the Docker client in DockerStatsStreamer fails for an extended period, the same value may be repeatedly collected in latest. This can lead to a spike immediately after recovery.

jopemachine
jopemachine previously approved these changes Apr 22, 2026
@rapsealk rapsealk changed the title refactor(agent): Stream container stats from Docker instead of polling refactor(BA-5859): Stream container stats from Docker instead of polling Apr 27, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

comp:agent Related to Agent component size:XL 500~ LoC

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Stream container stats instead of per-cycle polling

3 participants